package com.example.service.impl;

import com.example.entity.PriceData;
import com.example.service.AreaService;
import com.example.util.TimeConstant;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;

import static com.example.util.HBaseTableNameConstant.*;

/**
 * @author xqz
 * @description TODO
 * @dateTime 2023/5/30 12:24
 */
@Service
public class AreaServiceImpl implements AreaService {
    @Autowired
    private Connection hbaseConnection;

    @Override
    public List<PriceData> getAllYearOfProvinceData(String rowKey) throws IOException {
        TableName tableName = TableName.valueOf(AVG_PRICE_DATA_OF_PROVINCES_BY_YEAR_TABLE);
        // 获取到表的句柄
        Table table = hbaseConnection.getTable(tableName);
        // 创建一个'Get'操作，获取键为 "rowKey"的行
        Get get = new Get(Bytes.toBytes(rowKey));

        // 在表中执行'Get'操作
        Result result = table.get(get);

        List<PriceData> priceDataList = new ArrayList<>();
        double minValue = Double.MAX_VALUE;  // 初始化最小值为最大可能值
        double maxValue = Double.MIN_VALUE;  // 初始化最大值为最小可能值

        // 迭代列
        for (Cell cell : result.rawCells()) {
            String family = Bytes.toString(CellUtil.cloneFamily(cell));
            String column = Bytes.toString(CellUtil.cloneQualifier(cell));
            String value = Bytes.toString(CellUtil.cloneValue(cell));

            // 创建 DecimalFormat 对象，指定保留两位小数的格式
            DecimalFormat decimalFormat = new DecimalFormat("#.##");
            // 创建 PriceData 对象并添加到列表中
            String valueFormatted = decimalFormat.format(Double.valueOf(value));  // 格式化 value 的值，保留两位小数
            double parsedValue = Double.valueOf(valueFormatted);

            if (parsedValue < minValue) {
                minValue = parsedValue;  // 更新最小值
            }
            if (parsedValue > maxValue) {
                maxValue = parsedValue;  // 更新最大值
            }

            PriceData priceData = new PriceData(column, rowKey, TimeConstant.DEFAULT_MONTH, parsedValue);
            priceDataList.add(priceData);
        }

        // 关闭表连接
        table.close();

        // 创建包含最大值和最小值的 PriceData 对象，并添加到列表的开头
        priceDataList.add(0, new PriceData("Max Value", rowKey, TimeConstant.DEFAULT_MONTH, maxValue));
        priceDataList.add(0, new PriceData("Min Value", rowKey, TimeConstant.DEFAULT_MONTH, minValue));

        // 返回所有列的值
        return priceDataList;
    }

    @Override
    public PriceData getHBaseData(String province, String year, String month) throws IOException {
        TableName tableName = TableName.valueOf(AVG_PRICE_DATA_OF_MONTHS_BY_YEAR_AND_PROVINCE_TABLE);
        // 获取到'PriceData'表的句柄
        Table table = hbaseConnection.getTable(tableName);
        // 创建一个'Get'操作，获取键为 "province_year_month"的行
        String rowKey = province + "_" + year + "_" + month;
        Get get = new Get(Bytes.toBytes(rowKey));

        // 在表中执行'Get'操作
        Result result = table.get(get);

        // 从'Result'对象中获取'info'列族中各列的值
        byte[] provinceBytes = result.getValue(Bytes.toBytes("info"), Bytes.toBytes("province"));
        byte[] yearBytes = result.getValue(Bytes.toBytes("info"), Bytes.toBytes("year"));
        byte[] monthBytes = result.getValue(Bytes.toBytes("info"), Bytes.toBytes("month"));
        byte[] priceBytes = result.getValue(Bytes.toBytes("info"), Bytes.toBytes("average_price"));

        String provinceValue = Bytes.toString(provinceBytes);
        String yearValue = Bytes.toString(yearBytes);
        String monthValue = Bytes.toString(monthBytes);
        String priceValue = Bytes.toString(priceBytes);

        if (monthValue == null || monthValue.isEmpty()) {
            monthValue = month;
            priceValue = "0";
        }

        // 关闭表连接
        table.close();
        // 创建 DecimalFormat 对象，指定保留两位小数的格式
        DecimalFormat decimalFormat = new DecimalFormat("#.##");
        // 创建 PriceData 对象并添加到列表中
        String valueFormatted = decimalFormat.format(Double.valueOf(priceValue));  // 格式化 value 的值，保留两位小数
        // 返回各列的值
        return new PriceData(provinceValue, yearValue, monthValue, Double.valueOf(valueFormatted));
        // return "Province: " + provinceValue + ", Year: " + yearValue + ", Month: " + monthValue + ", Average Price: " + priceValue;
    }

    @Override
    public int getOverallCount() throws IOException {
        TableName tableName = TableName.valueOf(OVERALL_STATS_TABLE_NAME);
        // 获取到表的句柄
        Table table = hbaseConnection.getTable(tableName);
        // 创建一个'Get'操作，获取键为 "overall"的行
        String rowKey = "overall";
        Get get = new Get(Bytes.toBytes(rowKey));

        // 在表中执行'Get'操作
        Result result = table.get(get);

        // 从'Result'对象中获取'stats'列族中的'count'列的值
        byte[] countBytes = result.getValue(Bytes.toBytes(STATS_COLUMN_FAMILY), Bytes.toBytes(COUNT_COLUMN));

        // 使用 Bytes.toLong(byte[]) 而不是 Bytes.toString(byte[])
        long countValue = Bytes.toLong(countBytes);

        // 关闭表连接
        table.close();

        if (countValue < 0) {
            System.out.println("countValue = " + countValue);
            return -1;
        }
        return (int) countValue;
    }

    @Override
    public int getProvinceCount() throws IOException {
        // 定义表名
        TableName tableName = TableName.valueOf("province_stats_table");

        // 获取到表的句柄
        Table table = hbaseConnection.getTable(tableName);

        // 创建一个 Scan 对象
        Scan scan = new Scan();

        // 添加过滤器，以获取 "stats" 列族下的 "count" 列
        FilterList filterList = new FilterList();
        filterList.addFilter(new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("stats"))));
        filterList.addFilter(new QualifierFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("count"))));
        scan.setFilter(filterList);

        // 在表中执行 Scan 操作
        ResultScanner resultScanner = table.getScanner(scan);

        // 遍历结果并计算省份个数
        int provinceCount = 0;
        for (Result result : resultScanner) {
            provinceCount++;
        }

        // 关闭表连接
        table.close();

        return provinceCount;
    }

    @Override
    public int getMarketCount() throws IOException {
        TableName tableName = TableName.valueOf(PROVINCE_STATS_TABLE_NAME);
        // 获取到表的句柄
        Table table = hbaseConnection.getTable(tableName);

        // 创建一个 'Scan' 操作，获取表中所有行
        Scan scan = new Scan();

        // 添加'stats'列族中的'count'列
        scan.addColumn(Bytes.toBytes(STATS_COLUMN_FAMILY), Bytes.toBytes(COUNT_COLUMN));

        // 在表中执行 'Scan' 操作
        ResultScanner scanner = table.getScanner(scan);

        int totalCount = 0;
        for (Result result = scanner.next(); result != null; result = scanner.next()) {
            // 从 'Result' 对象中获取 'stats' 列族中的 'count' 列的值
            byte[] countBytes = result.getValue(Bytes.toBytes(STATS_COLUMN_FAMILY), Bytes.toBytes(COUNT_COLUMN));

            // 使用 Bytes.toLong(byte[]) 而不是 Bytes.toString(byte[])
            long countValue = Bytes.toLong(countBytes);

            // 将当前省份的市场总数累加到 totalCount
            totalCount += countValue;
        }

        // 关闭 ResultScanner 和表连接
        scanner.close();
        table.close();

        if (totalCount < 0) {
            System.out.println("totalCount = " + totalCount);
            return -1;
        }
        return totalCount;
    }
}
